package com.katesoft.scale4j.agent;

import com.katesoft.scale4j.agent.args.AgentOptions;
import com.katesoft.scale4j.agent.exceptions.ExecutionTimeoutException;
import com.katesoft.scale4j.agent.exceptions.FailedToStarAllServicesException;
import com.katesoft.scale4j.agent.service.ServiceDetails;
import com.katesoft.scale4j.agent.service.ServiceStatus;
import com.katesoft.scale4j.common.concurrent.FixedTimeOperation;
import com.katesoft.scale4j.common.io.FileUtility;
import com.katesoft.scale4j.common.lang.RuntimeUtility;
import com.katesoft.scale4j.common.utils.StringUtility;
import com.katesoft.scale4j.log.LogFactory;
import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.rttp.jmx.RttpSupportStopOperationInvoker;
import com.katesoft.xml.jvmcluster.ApplicationConfigurationDocument.ApplicationConfiguration;
import com.katesoft.xml.jvmcluster.LogConfigurationDocument.LogConfiguration;
import com.katesoft.xml.jvmcluster.ServiceDocument.Service;
import net.jcip.annotations.NotThreadSafe;
import org.springframework.context.Lifecycle;
import org.springframework.core.io.Resource;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * starts user defined services.
 * <p/>
 * handles life cycle of all services and exposes some useful functionality which can be ued for external exposing like JMX.
 *
 * @author kate2007
 */
@NotThreadSafe
public class ServiceLauncher implements Lifecycle
{
    private static final int STOP_SERVICE_TIMEOUT = 15 * 1000;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private Logger logger = LogFactory.getLogger(getClass());
    private ConcurrentMap<String, ServiceDetails> services;
    private ConcurrentMap<String, Process> processes;
    private Service[] serviceArray;
    private ApplicationConfiguration configuration;
    private AgentOptions options;
    private Semaphore semaphore;

    public ServiceLauncher(ApplicationConfiguration configuration,
                           AgentOptions options)
    {
        this.configuration = configuration;
        this.options = options;
        serviceArray = configuration.getServiceArray();
        semaphore = new Semaphore(serviceArray.length);
        services = new ConcurrentHashMap<String, ServiceDetails>(serviceArray.length);
        processes = new ConcurrentHashMap<String, Process>(serviceArray.length);
        for (Service s : serviceArray) {
            services.put(s.getName(), new ServiceDetails(s.toString()));
        }
    }

    /** will try to start all services, if unexpected exception occurred will shutdown all started services. */
    @Override
    public void start()
    {
        Service[] serviceArray = configuration.getServiceArray();
        //
        logger.debug("starting %s services", serviceArray.length);
        for (Service service : serviceArray) {
            startProcess(service);
        }
        try {
            if (options.getTimeout() > 0) {
                Thread.sleep((options.getTimeout() / 2) * 1000);
                boolean b = semaphore.tryAcquire(serviceArray.length, (options.getTimeout() / 2), TimeUnit.SECONDS);
                if (!b) {
                    stopAllServices(false);
                    throw new ExecutionTimeoutException(
                        String.format("services %s are not finished execution, timeout = [%s secs]", services.keySet().toString(), options.getTimeout()));
                }
                else {
                    semaphore.release(serviceArray.length);
                }
            }
            else {
                semaphore.acquire(serviceArray.length);
                semaphore.release(serviceArray.length);
            }
        }
        catch (InterruptedException e) {
            logger.error(e);
            stopAllServices(false);
            throw new FailedToStarAllServicesException(e);
        }
    }

    @Override
    public void stop()
    {
        stopAllServices(false);
        executor.shutdown();
    }

    /**
     * modify service status(from starting to running for example).
     *
     * @param key    service name
     * @param status new status of process
     */
    protected void changeServiceStatus(String key,
                                       ServiceStatus status)
    {
        logger.info("changing status of service %s %s:=>%s", key, services.get(key).getStatus().name(), status.name());
        services.get(key).setStatus(status);
    }

    /**
     * will pass log4j configuration system parameter if necessary.
     *
     * @param logConfiguration xml fragment wrapper.
     * @param processBuilder   JVM process builder.
     */
    protected void setLogConfiguration(LogConfiguration logConfiguration,
                                       JavaProcessBuilder processBuilder)
    {
        if (logConfiguration != null && !StringUtility.isEmpty(logConfiguration.getLocation())) {
            try {
                Resource resource = FileUtility.resolve(logConfiguration.getLocation());
                processBuilder.systemProperty("log4j.configuration", resource.getURL().toExternalForm());
            }
            catch (IOException e) {
                logger.error("failed to resolve log configuration for location %s", logConfiguration.getLocation());
            }
        }
    }

    /**
     * start service process from XML configuration.
     *
     * @param service xml configuration wrapper
     */
    protected void startProcess(Service service)
    {
        if (services.get(service.getName()).getStatus().isRunning()) {
            throw new IllegalStateException(String.format("service = %s is already running", service.getName()));
        }
        JavaProcessBuilder p1 = new JavaProcessBuilder(configuration, service);
        setLogConfiguration(service.getLogConfiguration(), p1);
        startProcessThread(service.getName(), p1);
    }

    /**
     * launch service in separate JVM.
     *
     * @param serviceName unique service's name.
     * @param p           java process builder to be used for JVM creation.
     * @return thread that was used for service launch.
     */
    protected Thread startProcessThread(final String serviceName,
                                        final JavaProcessBuilder p)
    {
        p.classpath(System.getProperty("java.class.path"));
        p.systemProperty(RuntimeUtility.VAR_SERVICE_ID, serviceName);
        Runnable runnable = new Runnable()
        {
            @Override
            public void run()
            {
                logger.debug("launching JVM for service[%s]", serviceName);
                try {
                    semaphore.acquire();
                    Process process = p.launch(System.out, System.err);
                    processes.put(serviceName, process);
                    changeServiceStatus(serviceName, ServiceStatus.RUNNING);
                    process.waitFor();
                    logger.info("[process=%s, service=%s] finished execution with exit_code=%s", process, serviceName, process.exitValue());
                    if (process.exitValue() != 0) {
                        if (services.get(serviceName).getStatus() != ServiceStatus.TERMINATED) {
                            changeServiceStatus(serviceName, ServiceStatus.FAILED_TO_START);
                            logger.error("[process=%s, service=%s] terminated with bad exit value %s, please check logs", process, serviceName,
                                         process.exitValue());
                        }
                    }
                    else {
                        logger.info("execution of %s was successful", serviceName);
                        changeServiceStatus(serviceName, ServiceStatus.TERMINATED);
                    }
                    RuntimeUtility.gc();
                }
                catch (IOException e) {
                    logger.error(e);
                }
                catch (InterruptedException e) {
                    logger.error(e);
                    Thread.interrupted();
                }
                finally {
                    semaphore.release();
                }
            }
        };
        Thread serviceThread = new Thread(runnable);
        serviceThread.setDaemon(false);
        serviceThread.setName(String.format("jvmcluster_%s_launcher_thread", serviceName));
        serviceThread.start();
        return serviceThread;
    }

    /** @return true if there are running services. */
    @Override
    public boolean isRunning()
    {
        return runningCount() > 0;
    }

    /** @return number of running services */
    public int runningCount()
    {
        return services.size() - semaphore.availablePermits();
    }

    /** @return collection with service details. */
    public ConcurrentMap<String, ServiceDetails> getServices()
    {
        return services;
    }

    /**
     * shutdown all running services(each service in separate thread)
     *
     * @param forceKill do we need to destroy process or need to send shutdown command and wait for shutdown finish?
     */
    public void stopAllServices(final boolean forceKill)
    {
        long time = System.currentTimeMillis();
        final CountDownLatch stopCountDownLatch = new CountDownLatch(services.size());
        for (final String s : services.keySet()) {
            Thread stopThread = new Thread(new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        stopService(s, forceKill);
                    }
                    catch (Exception e) {
                        logger.error(e);
                    }
                    finally {
                        stopCountDownLatch.countDown();
                    }
                }
            });
            stopThread.setName(String.format("jvmcluster_%s_stop_thread", s));
            stopThread.start();
        }
        try {
            stopCountDownLatch.await();
            logger.debug("all services stopped in [%s] milisecs", (System.currentTimeMillis() - time));
        }
        catch (InterruptedException e) {
            logger.error(e);
        }
    }

    /**
     * allows you to stop application service by name.
     * <p/>
     * This functionality can be used for JMX exposing.
     *
     * @param name      service's unique name.
     * @param forceKill do we need to destroy process or need to send shutdown command and wait for shutdown finish?
     * @throws Exception if unable to connect to remote jmx server or if unable to invoke remote operation or if unable to destroy process
     */
    public void stopService(final String name,
                            boolean forceKill) throws Exception
    {
        logger.info("stopping service = %s", name);
        ServiceStatus status = services.get(name).getStatus();
        if (status.isActive()) {
            changeServiceStatus(name, ServiceStatus.TERMINATED);
            Process process = processes.get(name);
            if (process != null) {
                if (forceKill) { process.destroy(); }
                else {
                    try {
                        for (Service service : serviceArray) {
                            if (service.getName().equals(name)) {
                                int jmxPort = service.getJmxPort().intValue();
                                final RttpSupportStopOperationInvoker proxy =
                                    new RttpSupportStopOperationInvoker(String.format("service:jmx:rmi:///jndi/rmi://localhost:%s/jmxrmi", jmxPort));
                                logger.info("calling remote jmx stopService method for service[%s]", name);
                                FixedTimeOperation operation = new FixedTimeOperation(executor, new Callable<Object>()
                                {
                                    @Override
                                    public Object call() throws Exception
                                    {
                                        if (proxy.invoke()) {
                                            logger.info("service[%s] stopped using jmx stopService method ", name);
                                        }
                                        return null;
                                    }
                                }, STOP_SERVICE_TIMEOUT);
                                operation.call();
                                break;
                            }
                        }
                    }
                    catch (TimeoutException e) {
                        logger.warn("unable to stop service %s, timeout = %s excised", name, STOP_SERVICE_TIMEOUT);
                    }
                    finally {
                        logger.info("calling destroy on %s for service=%s", process, name);
                        process.destroy();
                    }
                }
            }
        }
        else { logger.info("service=%s is not active", name); }
    }

    /**
     * allows you to start application service by name.
     * <p/>
     * This functionality can be used for JMX exposing.
     *
     * @param name service's unique name.
     */
    public void startService(String name)
    {
        logger.info("manual start called for service = %s", name);
        for (int i = 0; i < configuration.getServiceArray().length; i++) {
            Service service = configuration.getServiceArray(i);
            if (service.getName().equalsIgnoreCase(name)) {
                startProcess(service);
                break;
            }
        }
    }

    /** @return if one of application services failed to start. */
    public boolean isStartupFailed()
    {
        return !getFailedServices().isEmpty();
    }

    public Collection<ServiceDetails> getFailedServices()
    {
        Collection<ServiceDetails> failedServices = new LinkedHashSet<ServiceDetails>();
        for (ServiceDetails next : services.values()) {
            if (next.getStatus() != null && next.getStatus().isFailedToStart()) { failedServices.add(next); }
        }
        return failedServices;
    }

    public void join() throws InterruptedException
    {
        semaphore.acquire(services.size());
        semaphore.release(services.size());
    }
}
